 1.实时数据采集之配置Nginx
   
   架构流程
   车辆会装有各种传感器以及与后台通信的客户端，通过客户端把相关数据传送到后台。
   1).安装git工具，安装wget下载工具
   yum install wget git -y
   yum install gcc-c++ -y
   2).切换到/usr/local/src目录，然后将kafka的 客户端源码使用git clone到本地
   cd /usr/local/src
   git clone https://github.com/edenhill/librdkafka
   注意：要保证下载成功！！不能出现failed字样
   3).进入librdkafka目录，对kafka客户端源码进行编译
   cd /usr/local/src/librdkafka
   yum install -y gcc gcc-c++ pcre-devel zlib-devel
   ./configure
   make && make install
   注意：gcc编译的时候出现如下问题：
   collect2 cannot find 'ld'
   解决方式：yum reinstall binutils -y
   4).安装nginx 整合kafka的插件，进入到/usr/local/src目录下，
   使用git clone nginx整合kafka的源码
   cd /usr/local/src
   git clone https://github.com/brg-liuwei/ngx_kafka_module
   5).下载nginx源码包
   cd /usr/local/src
   wget http://nginx.org/download/nginx-1.17.8.tar.gz
   tar -zxvf nginx-1.17.8.tar.gz
   cd nginx-1.17.8
   yum install gcc zlib zlib-devel openssl openssl-devel pcre pcre-devel -y
   6).进入到nginx的源码目录下(编译nginx，包含与kafka整合的插件)
   cd /usr/local/src/nginx-1.17.8
   ./configure --add-module=/usr/local/src/ngx_kafka_module/
   make && make install
   
   7).修改nginx配置文件
   cd /usr/local/nginx/conf
   设置一个location和kafka的topic信息
   注意：
   1、 配置文件路径是：/usr/local/nginx/conf下的nginx.conf文件，不要与之前下载的nginx安装包目录
混淆！！
   2、 内容都是添加到http标签内
   完整文件参考
   vim nginx.conf
   
   #keepalive_timeout 0;
   keepalive_timeout 65;
   kafka;
   kafka_broker_list linux121:9092,linux122:9092,linux123:9092;
   #gzip on;

   location = /log/lagou_bus_info {
     kafka_topic lagou_bus_info;
   }
   8).创建kafka topic
   lagou_bus_info
   启动nginx
   /usr/local/nginx/sbin/nginx
   报错：
   /usr/local/nginx/sbin/nginx: error while loading shared libraries:
librdkafka.so.1: cannot open shared object file: No such file or directory
   解决方式
   echo "/usr/local/lib" >> /etc/ld.so.conf

   #手动加载
   ldconfig
   再次启动
   /usr/local/nginx/sbin/nginx
   #验证nginx启动
   # ps -ef | grep nginx
root       5350      1  0 00:03 ?        00:00:00 nginx: master process /usr/local/nginx/sbin/nginx
nobody     5351   5350  0 00:03 ?        00:00:00 nginx: worker process
root       5355   1280  0 00:03 pts/0    00:00:00 grep --color=auto nginx
   9).测试
   通过curl命令测试发送请求到Nginx
   curl http://linux123/log/lagou_bus_info -d "this is a msg from nginx to kafka"
   使用控制台消费者验证结果
   kafka-console-consumer.sh --bootstrap-server linux121:9092  --topic
lagou_bus_info --from-beginning

 2.车载客户端
   
   编写Java程序模拟车载客户端上传各种传感器采集的数据
package com.lg.collect;

import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DataClient {
    //调配编号
    static String[] deployArr = {"aaaa", "bbbb", "cccc", "dddd", "eeee"};
    //sim卡号
    static String[] simArr = {"1111", "2222", "3333", "4444", "5555"};
    //道路运输证
    static String[] transpotNumArr = {"jk1223", "kj2342",};
    //车牌号
    static String[] plateNumArr = {"kb8888", "kb6666"};
    //时间static
    //static String[] timeStrArr = {"1594076827", "1594076527", "1594076327"};
    //经纬度
    static String[] lglatArr = {"116.437355_39.989739",
            "116.382306_39.960325",
            "116.623784_40.034688",
            "116.32139_39.81157",
            "116.45551_39.944381",};
    //速度
    static String[] speedArr = {"50", "60", "70", "80"};
    //方向
    static String[] directionArr = {"west", "east", "south", "north"};
    //里程
    static String[] mileageArr = {"6000", "7000", "8000", "9000"};
    //剩余油量
    static String[] oilRemainArr = {"20", "30", "70", "80"};
    //载重质量
    static String[] weightsArr = {"500", "1000", "2000", "3000"};
    //ACC开关
    static String[] accArr = {"0", "1"};
    //是否定位
    static String[] locateArr = {"0", "1"};
    //车辆油路是否正常
    static String[] oilWayArr = {"0", "1"};
    //车辆电路是否正常
    static String[] electricArr = {"0", "1"};

    /**
     * * @param url
     * * @param msg
     * * @return
     *   
     */
    public static String httpPost(String url, String msg) {
        String returnValue = "这是默认返回值，接口调用失败";
        CloseableHttpClient httpClient = HttpClients.createDefault();
        ResponseHandler<String> responseHandler = new BasicResponseHandler();
        try {
            //第一步：创建HttpClient对象
            httpClient = HttpClients.createDefault();
            //第二步：创建httpPost对象
            HttpPost httpPost = new HttpPost(url);
            //第三步：给httpPost设置JSON格式的参数
            StringEntity requestEntity = new StringEntity(msg, "utf-8");
            requestEntity.setContentEncoding("UTF-8");
            httpPost.setEntity(requestEntity);
            //第四步：发送HttpPost请求，获取返回值
            httpClient.execute(httpPost, responseHandler);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //第五步：处理返回值
        return returnValue;
    }


    public static void main(String[] args) throws InterruptedException {
        String url = "http://linux123/log/lagou_bus_info";
        int n = 100;
        final Random rd = new Random();
        while (n > 0) {
            //拼接信息
            final StringBuilder sb = new StringBuilder();
            sb.append(deployArr[rd.nextInt(deployArr.length)]).append(",");
            sb.append(simArr[rd.nextInt(simArr.length)]).append(",");

            sb.append(transpotNumArr[rd.nextInt(transpotNumArr.length)]).append(",");
            sb.append(plateNumArr[rd.nextInt(plateNumArr.length)]).append(",");
            sb.append(lglatArr[rd.nextInt(lglatArr.length)]).append(",");
            sb.append(speedArr[rd.nextInt(speedArr.length)]).append(",");

            sb.append(directionArr[rd.nextInt(directionArr.length)]).append(",");
            sb.append(mileageArr[rd.nextInt(mileageArr.length)]).append(",");
            //sb.append(timeStrArr[rd.nextInt(timeStrArr.length)]).append(",");

            sb.append(oilRemainArr[rd.nextInt(oilRemainArr.length)]).append(",");
            sb.append(weightsArr[rd.nextInt(weightsArr.length)]).append(",");
            sb.append(accArr[rd.nextInt(accArr.length)]).append(",");
            sb.append(locateArr[rd.nextInt(locateArr.length)]).append(",");
            sb.append(oilWayArr[rd.nextInt(oilWayArr.length)]).append(",");
            sb.append(electricArr[rd.nextInt(electricArr.length)]);
            httpPost(url, sb.toString());
            TimeUnit.SECONDS.sleep(1);
            n--;
        }
    }
}

   